package com.hjs.homework.window;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

import java.util.Collection;

public class SessionWindowJava {
    public static void main(String[] args) throws Exception {
        String host = "bogon";
        int port = 7777;

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStreamSource<String> streamSource = env.socketTextStream(host, port);

        SingleOutputStreamOperator<String> maped = streamSource.map(new MapFunction<String, String>() {
            @Override
            public String map(String s) throws Exception {
                return s;
            }
        });

        KeyedStream<String, String> keyed = maped.keyBy(value -> value);

        WindowedStream<String, String, TimeWindow> sessingWindow = keyed.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));

        SingleOutputStreamOperator<String> apply = sessingWindow.apply(new WindowFunction<String, String, String, TimeWindow>() {
            @Override
            public void apply(String s, TimeWindow timeWindow, Iterable<String> iterable, Collector<String> collector) throws Exception {
                StringBuilder stringBuilder = new StringBuilder();
                for (String str : iterable) {
                    stringBuilder.append(str);
                }
                collector.collect(stringBuilder.toString());
            }
        });

        apply.print();

        env.execute();

    }
}
